/**
* Project Name:myservice
* Date:2018年12月16日
* Copyright (c) 2018, jingma All Rights Reserved.
*/

package cn.benma666.kettle.ljq;

import cn.benma666.constants.UtilConst;
import cn.benma666.domain.SysPtglXtxx;
import cn.benma666.domain.SysQxYhxx;
import cn.benma666.domain.SysSjglFile;
import cn.benma666.exception.MyException;
import cn.benma666.iframe.Conf;
import cn.benma666.iframe.MyParams;
import cn.benma666.iframe.Result;
import cn.benma666.kettle.domain.VJob;
import cn.benma666.kettle.jobentry.easyexpand.JobEntryEasyExpandRunBase;
import cn.benma666.kettle.mytuils.Kettle;
import cn.benma666.kettle.mytuils.KettleManager;
import cn.benma666.myutils.FileUtil;
import cn.benma666.myutils.Log;
import cn.benma666.myutils.StringUtil;
import cn.benma666.sjsj.myutils.ThreadPool;
import cn.benma666.sjsj.web.LjqManager;
import cn.benma666.sjsj.web.XtxxWebSocket;
import cn.benma666.sjzt.Db;
import cn.benma666.vo.WebSocketKhdxxVo;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.beetl.sql.core.SqlId;
import org.pentaho.di.job.JobMeta;

import java.awt.image.BufferedImage;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 作业拦截器 <br/>
 * date: 2018年12月16日 <br/>
 * @author jingma
 * @version 0.1
 */
public class JobLjq extends KettleLjq {
    /**
     * 作业日志监听Map
     */
    private static final Map<String,WebSocketKhdxxVo> zyrzjtMap = new ConcurrentHashMap<>();
    @Override
    public Result wsmddzc(MyParams myParams) {
        super.wsmddzc(myParams);
        WebSocketKhdxxVo wsk = myParams.getObject($_SYS_WSKHDXX, WebSocketKhdxxVo.class);
        String log = null;
        SysPtglXtxx xx = new SysPtglXtxx(log);
        xx.setMdddl(wsk.getMdddl());
        xx.setMddxl(wsk.getMddxl());
        xx.setTshh(wsk.getToken());
        if(KettleManager.JobMap.containsKey(wsk.getMddxl())){
            //当前节点上运行的作业
            log = KettleManager.getLog(wsk.getMddxl(),0).getMsg();
        }else {
            //目前没在当前节点运行
            String[] arr = wsk.getMddxl().split("&");
            JSONObject job = Db.use(arr[0]).findFirst(SqlId.of("kee", "selectJob"),
                    Db.buildKeyMap("id_job", arr[1]));
            if(!Conf.getAppdm().equals(requireNonNull(job,"没有找到该作业："+wsk.getMddxl())
                    .getString("ddjd"))&&StringUtil.isNotBlank(job.getString("ddjd"))){
                //调度节点非当前节点且调度节点非空，说明在其他节点调度。
                MyParams logJcxx = new MyParams(true);
                logJcxx.set("$.sjdx.dxdm","KETTLE_GLPT_ZYGL");
                logJcxx.set($_SYS_CLLX,"getLog");
                logJcxx.set("$.yobj.key",wsk.getMddxl());
                logJcxx.set("$.yobj.line",0);
                logJcxx.set("$.yobj.zyk",arr[0]);
                MyParams jcxx = LjqManager.jcxxByParams(logJcxx,myParams.getObject(KEY_USER, SysQxYhxx.class));
                //跨节点获取日志
                jcxx.set("$.sys.ctapp",job.getString("ddjd"));
                Result r = LjqManager.data(jcxx);
                if(r.isStatus()&&r.getData()!=null){
                    //正常获取日志
                    log = r.getMsg();
                    String key = wsk.getToken()+wsk.getMdd();
                    if(!zyrzjtMap.containsKey(key)){
                        zyrzjtMap.put(key,wsk);
                        ThreadPool.use().run(()->{
                            Integer line = r.getData(Integer.class);
                            //单独开一个线程持续获取日志进行客户端推送
                            while (true){
                                if(!zyrzjtMap.containsKey(key)){
                                    Log.getLogger().info("已经取消日志监听："+key);
                                    break;
                                }
                                jcxx.set($_OTHEROBJ_YSPARAMS+".yobj.line",line);
                                Result r1 = LjqManager.data(jcxx);
                                String log1;
                                if(r1.isStatus()){
                                    //正常获取日志
                                    log1 = r1.getMsg();
                                }else {
                                    log1 = "跨节点获取日志异常："+r1.getMsg();
                                }
                                if(StringUtil.isNotBlank(log1)){
                                    line = r1.getData(Integer.class);
                                    //有新消息
                                    xx.setXxnr(log1);
                                    r1 = XtxxWebSocket.sendMsg(xx,myParams.user());
                                    if(!r1.isStatus()){
                                        Log.getLogger().info("推送日志失败结束："+key);
                                        break;
                                    }
                                }
                                try {
                                    Thread.sleep(10000);
                                } catch (InterruptedException e) {
                                    Log.getLogger().info("日志监听结束："+key);
                                    break;
                                }
                            }
                        });
                    }else {
                        Log.getLogger().debug("该key已经有线程在处理日志读取了：{}",key);
                    }
                }else {
                    log = "跨节点获取日志异常："+r.getMsg();
                }
            }else {
                log = "该作业当前未运行。若想查看历史运行日志信息，请到【基础日志】页面查询并下载对应日志文件。";
            }
        }
        xx.setXxnr(log);
        XtxxWebSocket.sendMsg(xx,myParams.user());
        return success("初次查看，推送历史日志");
    }
    /**
     * websocket客户端取消事件
     */
    public Result wsmddqx(MyParams myParams){
        WebSocketKhdxxVo wsk = myParams.getObject($_SYS_WSKHDXX, WebSocketKhdxxVo.class);
        String key = wsk.getToken()+wsk.getMdd();
        //取消监听
        zyrzjtMap.remove(key);
        return super.wsmddqx(myParams);
    }

    @Override
    public Result select(MyParams myParams) {
        return super.select(myParams);
    }

    /**
     * 获取日志
     */
    public Result getLog(MyParams myParams) {
        return KettleManager.getLog(myParams.getString("$.yobj.key"),
                myParams.getIntValue("$.yobj.line"));
    }

    @Override
    public Result plsc(MyParams myParams) {
        //失败的作业数
        int flag = 0;
        List<VJob> jobs = select(myParams).getPageList(VJob.class);
        //批量删除作业
        for(VJob job : jobs){
            try {
                Kettle.use(job.getZyk()).delJob(job.getId_job());
                //删除对应的扩展信息
                MyParams zykzJcxx = LjqManager.jcxxByDxdm("KETTLE_GLPT_ZYKZ");
                zykzJcxx.set($_SYS_IDS,new Object[]{job.getId_job()});
                zykzJcxx.set($_SYS_CLLX,KEY_CLLX_DELETE);
                Result r = LjqManager.data(zykzJcxx);
                if(!r.isStatus()){
                    throw r.newMyException();
                }
            } catch (Exception e) {
                flag++;
                log.error("删除job失败:"+job, e);
            }
        }
        if(flag==0){
            return success("删除作业成功："+jobs.size());
        }else{
            return failed("删除成功作业数："+(jobs.size()-flag)+"，失败作业数："+flag+"，请查看系统日志分析原因！");
        }
    }

    /**
     * 启动作业
     */
    public Result qd(MyParams myParams) {
        //运行状态，默认启动失败
        String runStatus;
        int flag = 0;
        myParams.set("$.page.pageSize",10000);
        myParams.set("$.page.totalRequired",false);
        List<VJob> jobs = select(myParams).getPageList(VJob.class);
        //启动作业
        for(VJob job : jobs){
            if(!isBlank(job.getDdjd())&&!Conf.getAppdm().equals(job.getDdjd())){
                return failed(job.getName()+"：该任务不在本调度节点执行，请选择对应调度节点后执行");
            }
            //当前节点调度
            runStatus = KettleManager.START_FAILED;
            try {
                runStatus = KettleManager.startJob(job);
            } catch (Exception e) {
                flag++;
                log.error("启动job失败:"+job, e);
            }
            KettleManager.updateZykz(new VJob(getSjdx().getDxzt(),job.getId_job(),runStatus,Conf.getAppdm()));
        }
        if(flag==0){
            return success("作业启动成功："+jobs.size());
        }else{
            return failed("启动成功作业数："+(jobs.size()-flag)+"，失败作业数："+flag+"，请查看系统日志分析原因！");
        }
    }

    /**
     * 停止作业
     */
    public Result tz(MyParams myParams) {
        //运行状态，默认启动失败
        String runStatus;
        int flag = 0;
        myParams.set("$.page.pageSize",10000);
        myParams.set("$.page.totalRequired",false);
        List<VJob> jobs = select(myParams).getPageList(VJob.class);
        //停止作业
        for(VJob job : jobs){
            if(!isBlank(job.getDdjd())&&!Conf.getAppdm().equals(job.getDdjd())){
                return failed(job.getName()+"：该任务不在本调度节点执行，请选择对应调度节点后执行");
            }
            runStatus = KettleManager.STOP_FAILED;
            try {
                runStatus = KettleManager.stopJob(job);
            } catch (Exception e) {
                flag++;
                log.error("停止job失败:"+job, e);
            }
            KettleManager.updateZykz(new VJob(getSjdx().getDxzt(),job.getId_job(),runStatus));
        }
        if(flag==0){
            return success("作业停止成功："+jobs.size());
        }else{
            return failed("停止成功作业数："+(jobs.size()-flag)+"，失败作业数："+flag+"，请查看系统日志分析原因！");
        }
    }
    /**
     * 结束作业
     */
    public Result js(MyParams myParams) {
        int flag = 0;
        myParams.set("$.page.pageSize",10000);
        myParams.set("$.page.totalRequired",false);
        List<VJob> jobs = select(myParams).getPageList(VJob.class);
        //结束作业：强制杀死操作
        for(VJob job : jobs){
            if(!isBlank(job.getDdjd())&&!Conf.getAppdm().equals(job.getDdjd())){
                return failed(job.getName()+"：该任务不在本调度节点执行，请选择对应调度节点后执行");
            }
            try {
                KettleManager.killJob(job);
            } catch (Exception e) {
                flag++;
                log.error("结束job失败:"+job, e);
            }
        }
        if(flag==0){
            return success("作业结束成功："+jobs.size());
        }else{
            return failed("结束成功作业数："+(jobs.size()-flag)+"，失败作业数："+flag+"，请查看系统日志分析原因！");
        }
    }
    /**
     * 重新生成
     */
    public Result cxsc(MyParams myParams) {
        int flag = 0;
        myParams.set("$.page.pageSize",10000);
        myParams.set("$.page.totalRequired",false);
        List<JSONObject> jobs = select(myParams).getPageList(JSONObject.class);
        //重新生成
        myParams.set($_SYS_CLLX, KEY_CLLX_UPDATE);
        int qtzy = 0;
        KettleService ks = new KettleService(myParams);
        //需要调用每个作业的基础信息
        for(JSONObject job : jobs){
            try {
                if("dxlz".equals(job.getString("zylx"))){
                    job.put("cxsc", UtilConst.WHETHER_TRUE);
                    ks.setJob(job.toJavaObject(VJob.class));
                    ks.getJobDxlz();
                    myParams.put(KEY_OBJ, job);
                    myParams.put(KEY_YOBJ, job);
                    save(myParams);
                }else{
                    qtzy++;
                }
            } catch (Exception e) {
                flag++;
                log.error("重新生成失败:"+job, e);
            }
        }
        if(flag+qtzy==0){
            return success("重新生成成功："+jobs.size());
        }else{
            Result r = failed("已重新生成作业数："+(jobs.size()-flag-qtzy));
            if(flag>0){
                r.addMsg("失败作业数："+flag+"，请查看系统日志分析原因！");
            }else{
                r.setStatus(true);
            }
            if(qtzy>0){
                r.addMsg("重新生成功能只针对对象流转类型的作业,其他作业已自动忽略，其他作业数："+qtzy);
            }
            return r;
        }
    }
    /**
     * 重置作业
     */
    public Result cz(MyParams myParams) {
        int flag = 0;
        myParams.set("$.page.pageSize",10000);
        myParams.set("$.page.totalRequired",false);
        List<VJob> jobs = select(myParams).getPageList(VJob.class);
        for(VJob job : jobs){
            if(!isBlank(job.getDdjd())&&!Conf.getAppdm().equals(job.getDdjd())){
                return failed(job.getName()+"：该任务不在本调度节点执行，请选择对应调度节点后执行");
            }
            //重置作业，丢弃原有运行信息，用于卡死结束不掉的场景
            try {
                KettleManager.resetJob(job);
            } catch (Exception e) {
                flag++;
                log.error("重置job失败:"+job, e);
            }
        }
        if(flag==0){
            return success("作业重置成功："+jobs.size());
        }else{
            return failed("重置成功作业数："+(jobs.size()-flag)+"，失败作业数："+flag+"，请查看系统日志分析原因！");
        }
    }
    /**
     * 获取作业目录
     */
    public Result ml(MyParams myParams) {
        StringBuilder jg = new StringBuilder();
        myParams.set("$.page.pageSize",10000);
        myParams.set("$.page.totalRequired",false);
        List<VJob> jobs = select(myParams).getPageList(VJob.class);
        for(VJob job : jobs){
            //作业目录
            try {
                String dir = Kettle.use(job.getZyk()).getDirectory(job.getId_directory());
                jg.append("【").append(job.getName()).append("】：").append(dir).append("\n");
            } catch (Exception e) {
                log.error("获取作业目录失败:"+job, e);
                jg.append(job.getName()).append(":获取失败：")
                        .append(e.getMessage()).append("\n");
            }
        }
        return success(jg.toString());
    }
    /**
     * 获取作业图
     */
    public Result zyt(MyParams myParams) {
        myParams.set("$.page.pageSize",1);
        myParams.set("$.page.totalRequired",false);
        List<VJob> jobs = select(myParams).getPageList(VJob.class);
        if(jobs.size()==0){
            return failed("没有选择需要处理的作业");
        }
        VJob jobJson = jobs.get(0);
        //作业图
        try {
            SysSjglFile file = new SysSjglFile();
            file.setWjlx("png");
            file.setWjm(jobJson.getName()+"的作业图");
            file.setXzms(0);
            BufferedImage image = KettleManager.getJobImg(jobJson);
            JSONObject r = new JSONObject();
            r.put(KEY_FILE_BYTES, FileUtil.toBytes(image));
            r.put(KEY_FILE_OBJ, file);
            return success("获取作业图成功",r);
        } catch (Exception e) {
            log.error("获取作业图失败:"+jobJson, e);
            return failed("获取作业图失败，请查看系统日志分析原因:"+e.getMessage());
        }
    }

    /**
     * 导入作业
     */
    public Result drzy(MyParams myParams) {
        return failed("暂未实现");
    }
    /**
     * 获取KM默认配置
     */
    public Result getKmmrpz(MyParams myParams) {
        try {
            JSONObject yobj = myParams.getJSONObject(KEY_YOBJ);
            JSONObject obj = myParams.getJSONObject(KEY_OBJ);
            obj.putAll(yobj);
            String kmlm = obj.getString("kmlm");
            JobEntryEasyExpandRunBase ji = (JobEntryEasyExpandRunBase) Class.forName(kmlm).newInstance();
            JSONObject km = new JSONObject();
            km.put("kmpz", ji.getDefaultConfigInfo());
            return success("获取成功", km);
        } catch (Exception e) {
            log.error("获取配置失败"+getSjdx(), e);
            return failed("获取配置失败："+e.getMessage());
        }
    }
    /**
     * 获取对象流转默认配置
     */
    public Result getDxlzMrpz(MyParams myParams) {
        try {
            KettleService ks = new KettleService(myParams);
            JSONObject yobj = myParams.getJSONObject(KEY_YOBJ);
            JSONObject obj = myParams.getJSONObject(KEY_OBJ);
            obj.putAll(yobj);
            ks.setJob(obj.toJavaObject(VJob.class));
            return ks.getDxlzMrpz();
        } catch (Exception e) {
            log.error("获取对象流转默认配置失败"+getSjdx(), e);
            return failed("获取对象流转默认配置失败："+e.getMessage());
        }
    }
    public Result fzjl(MyParams myParams) {
        VJob yobj = myParams.getObject(KEY_YOBJ,VJob.class);
        //复制作业
        String[] jobPathArr = yobj.getMbzy().replace("\r", "").split("\n");
        JSONObject yJobJson = db(yobj.getZyk()).findFirst("select * from v_job where id_job=?",yobj.getId_job());
        StringBuilder successJob = new StringBuilder();
        StringBuilder failedJob = new StringBuilder();
        try {
            JobMeta yJob = Kettle.use(yobj.getZyk()).loadJob(yJobJson.getString("name"),
                    yJobJson.getLongValue("id_directory"));
            for(String jobPath:jobPathArr){
                jobPath = jobPath.replace("\\", "/");
                if(!jobPath.startsWith("/")){
                    jobPath = "/"+jobPath;
                }
                String dir = jobPath.substring(0, jobPath.lastIndexOf("/"));
                String name = jobPath.substring(jobPath.lastIndexOf("/")+1);
                if(StringUtils.isBlank(dir)){
                    dir = "/";
                }
                if(StringUtils.isBlank(name)){
                    failedJob.append(jobPath).append("[作业名称不能为空]\n");
                    continue;
                }
                yJob.setName(name);
                yJob.setRepositoryDirectory(Kettle.use(yobj.getZyk()).makeDirs(dir));
                //保存作业
                Kettle.use(yobj.getZyk()).saveJob(yJob);
                //更新作业扩展信息
                VJob kz = new VJob(yobj.getZyk(), Integer.parseInt(yJob.getObjectId().getId()));
                kz.setZylx(yJobJson.getString("zylx"));
                KettleManager.updateZykz(kz);
                successJob.append(jobPath).append("\n");
            }
        } catch (Exception e) {
            log.error("复制作业失败:"+yobj, e);
            return failed("复制作业失败："+e.getMessage()+"。\n复制成功的作业：\n"+successJob+"。\n复制失败的作业：\n"+failedJob);
        }
        return success("复制成功的作业：\n"+successJob+"。\n复制失败的作业：\n"+failedJob);
    }

    @Override
    public Result plbc(MyParams myParams) {
        myParams.set($_SYS_BATCH,false);
        return super.plbc(myParams);
    }
    @Override
    public Result jcxx(MyParams myParams) {
        myParams = (MyParams) super.jcxx(myParams).getData();
        if(StringUtil.isNotBlank(myParams.getString("$.yobj.id_job"))){
            if(!myParams.containsKey(KEY_OBJ)){
                //没有获取到数据库记录
                return success("获取基础信息成功",myParams);
            }
            VJob obj = myParams.getJSONObject(KEY_OBJ).toJavaObject(VJob.class);
            if(myParams.getBooleanValue("$.yobj.zyt")){
                //作业图场景
                try {
                    BufferedImage image = KettleManager.getJobImg(obj);
                    myParams.set("$.obj.zyt", Base64.getEncoder().encodeToString(FileUtil.toBytes(image)));
                    return success("获取基础信息成功",myParams);
                } catch (Exception e) {
                    throw new MyException("获取作业图失败",e);
                }
            }
            //修改
            try {
                String zylx = valByDef(obj.getZylx(),"cgzy");
                Result r1;
                KettleService ks = new KettleService(myParams);
                ks.setJob(obj);
                switch (zylx) {
                case "javascript":
                    r1  = ks.getJobJavascript();
                    break;
                case "sql":
                    r1 = ks.getJobSql();
                    break;
                case "km":
                    r1 = ks.getJobKm();
                    break;
                case "shell":
                    r1 = ks.getJobShell();
                    break;
                case "dxlz":
                    r1 = ks.getJobDxlz();
                    break;
                default:
                    r1 = ks.getJobInfo();
                    break;
                }
                if(r1.isStatus()){
                    myParams.put(KEY_OBJ,obj);
                }else{
                    throw r1.newMyException();
                }
            } catch (Exception e) {
                throw new MyException("获取作业信息失败",e,obj.toString());
            }
        }
        return success("获取基础信息成功",myParams);
    }
    @Override
    public Result save(MyParams myParams) {
        JSONObject yobj = myParams.getJSONObject(KEY_YOBJ);
        //更新时获取数据库中的记录信息,当查询不到时会采用yobj相同的值的克隆对象
        JSONObject obj = myParams.getJSONObject(KEY_OBJ);
        VJob vjob;
        Result r;
        if(myParams.getBooleanValue($_SYS_YZDJL)){
            myParams.set($_SYS_CLLX, KEY_CLLX_UPDATE);
        }else{
            myParams.set($_SYS_CLLX, KEY_CLLX_INSERT);
        }
        String cllx = getCllx(myParams);
        try {
            if(KEY_CLLX_UPDATE.equals(cllx)){
                //要用前端修改的数据覆盖部分属性，这里克隆一份,最终形成最新的记录信息
                obj = obj.clone();
                obj.putAll(yobj);
            }else {
                obj = yobj;
            }
            vjob = obj.toJavaObject(VJob.class);

            KettleService ks = new KettleService(myParams);
            ks.setJob(vjob);
            switch (vjob.getZylx()) {
            case "javascript":
                r = ks.editJobJavascript();
                break;
            case "sql":
                r = ks.editJobSql();
                break;
            case "km":
                r = ks.editJobKm();
                break;
            case "shell":
                r = ks.editJobShell();
                break;
            case "dxlz":
                r = ks.editJobDxlz();
                break;
            default:
                r = ks.editJobInfo();
                JobMeta jm = (JobMeta) r.getData();
                Kettle.use(vjob.getZyk()).saveJob(jm);
                break;
            }
        } catch (Exception e) {
            log.error("编辑作业失败："+obj,e);
            r = failed("编辑作业失败："+e.getMessage());
        }
        if(!r.isStatus()){
            return r;
        }
        if(r.isStatus()&&KEY_CLLX_INSERT.equals(cllx)){
            //取出作业元数据
            JobMeta jm = (JobMeta) r.getData();
            //设置作业主键到yobj
            yobj.put("id_job", Integer.parseInt(jm.getObjectId().getId()));
        }
        if(isBlank(yobj.get("zyk"))){
            yobj.put("zyk",obj.getString("zyk"));
        }
        return KettleManager.updateZykz(yobj.toJavaObject(VJob.class));
    }
}
